# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Setup for data transfer/redistribution between topologies or operators
`.. currentmodule : hysop.operator.redistribute
* :class:`~Redistribute` generate the optimal set of RedistributeOperatorBase instances
for one or multiple variables given candidate source topolgies and one output topology.
"""
from abc import ABCMeta, abstractmethod
from hysop.constants import DirectionLabels
from hysop.tools.htypes import check_instance, to_set, to_tuple, first_not_None
from hysop.tools.decorators import debug
from hysop.fields.continuous_field import Field
from hysop.topology.topology import Topology
from hysop.topology.cartesian_topology import CartesianTopology
from hysop.core.mpi.redistribute import (
RedistributeIntra,
RedistributeInter,
RedistributeInterParam,
RedistributeOperatorBase,
)
from hysop.core.graph.node_generator import ComputationalGraphNodeGenerator
[docs]
class RedistributeNotImplementedError(Exception):
pass
[docs]
class Redistribute(ComputationalGraphNodeGenerator):
"""Node generator generating redistribute operators."""
__redistribute_operators = {
RedistributeIntra: 0,
RedistributeInter: 1,
# 2: RedistributeOverlap,
}
"""
Implemented redistribute operators,
keys are classes that inerit
hysop.core.mpi.redistribute.RedistributeOperatorBase
values are operator priority (smaller value has more priority),
"""
for cls in __redistribute_operators.keys():
assert issubclass(
cls, RedistributeOperatorBase
), f"{cls} is not a RedistributeOperatorBase."
def __new__(
cls,
variables,
source_topos,
target_topo,
components=None,
name=None,
pretty_name=None,
base_kwds=None,
**kwds,
):
base_kwds = first_not_None(base_kwds, {})
return super().__new__(
cls,
name=name,
pretty_name=pretty_name,
candidate_input_tensors=None,
candidate_output_tensors=None,
**base_kwds,
)
def __init__(
self,
variables,
source_topos,
target_topo,
components=None,
name=None,
pretty_name=None,
base_kwds=None,
**kwds,
):
"""
Initialize a Redistribute operator generator.
Parameters
----------
variables: :class:`~hysop.field.continuous.Field` or array like of continuous fields.
the continuous variables to be distributed
source_topos: :class:`~hysop.topology.topology.Topology` or array like of topologies, or dict(field, topologies)
candidate source mesh topologies (for each field the optimal source topology will be choosed)
target_topo: :class:`~hysop.topology.topology.Topology` or dict(Field, Topology)
target mesh topology for all variables (or per variable if a dictionnary is passed)
name: string
prefix for generated operator names
pretty_name: string
pretty prefix for generated operator names
base_kwds: dict, optional, defaults to None
Base class keywords arguments.
If None, an empty dict will be passed.
kwds:
Keywords arguments that will be passed towards implementation
redistribute operator __init__.
"""
assert "source_topo" not in kwds
base_kwds = first_not_None(base_kwds, {})
variables = to_tuple(variables)
super().__init__(
name=name,
pretty_name=pretty_name,
candidate_input_tensors=variables,
candidate_output_tensors=variables,
**base_kwds,
)
# format variables to a set of variables
variables = to_set(variables)
check_instance(variables, set, values=Field)
# format source topos to a dict(Field, set(Topology))
if isinstance(source_topos, dict):
for k, v in source_topos:
if not isinstance(v, set):
source_topos[k] = to_set(v)
else:
source_topos = to_set(source_topos)
source_topos = dict(zip(variables, (source_topos,) * len(variables)))
check_instance(source_topos, dict, keys=Field, values=set)
for v in source_topos.values():
check_instance(v, set, values=(Topology, type(None)))
# format target_topo to a dict(Field, Topology)
if not isinstance(target_topo, dict):
check_instance(target_topo, Topology, allow_none=True)
target_topo = dict(zip(variables, (target_topo,) * len(variables)))
check_instance(target_topo, dict, keys=Field, values=(Topology, type(None)))
# format components to a dict(Field, set(int)|None)
if not isinstance(components, dict):
if components is not None:
components = to_set(components)
components = dict(zip(variables, (components,) * len(variables)))
check_instance(components, dict, keys=Field)
for v in components.values():
check_instance(v, set, values=int, allow_none=True)
self._variables = variables
self._source_topos = source_topos
self._target_topo = target_topo
self._components = components
self._kwds = kwds
@debug
def _generate(self):
nodes = []
for var in self._variables:
source_topos = self._source_topos[var]
target_topo = self._target_topo[var]
components = self._components[var]
kwds = self._kwds.copy()
# if source topology is destination topology there is nothing to be done
if target_topo in source_topos:
continue
# else we find the most suitable source topology
node = self._select_redistribute(
variable=var,
source_topos=source_topos,
target_topo=target_topo,
components=components,
name=self.name,
pretty_name=self.pretty_name,
**kwds,
)
nodes.append(node)
return nodes
@staticmethod
def _select_redistribute(variable, source_topos, target_topo, components, **kwds):
assert target_topo not in source_topos
best_redis = None
for source_topo in source_topos:
redis = Redistribute._get_compatible_redistribute(
variable, source_topo, target_topo, components, **kwds
)
best_redis = Redistribute._select_best_operator(best_redis, redis)
if best_redis is None:
msg = "Failed to find a suitable redistribute operator for variables {} "
msg += "between sources topologies and destination topology.\n"
msg = msg.format(variable.name)
for i, st in enumerate(source_topos):
msg += f"\n::CANDIDATE SOURCE TOPOLOGY {i}::\n"
msg += str(st)
msg += "\n::DESTINATION TOPOLOGY::\n" + str(target_topo)
msg += "\n"
raise RedistributeNotImplementedError(msg)
return best_redis
@staticmethod
def _select_best_operator(lhs, rhs):
# select highest priority operator when there are more
# than one candidate source topology
if lhs is None:
return rhs
if rhs is None:
return lhs
check_instance(lhs, RedistributeOperatorBase)
check_instance(rhs, RedistributeOperatorBase)
lhs_priority = Redistribute.__redistribute_operators[type(lhs)]
rhs_priority = Redistribute.__redistribute_operators[type(rhs)]
if lhs_priority <= rhs_priority:
return lhs
else:
return rhs
@staticmethod
def _get_compatible_redistribute(
variable, source_topo, target_topo, components, **kwds
):
# look from highest prority operator to smallest priority operators
# if nothing is found return None
for cls, _ in sorted(
Redistribute.__redistribute_operators.items(), key=lambda x: x[1]
):
if cls.can_redistribute(
source_topo=source_topo, target_topo=target_topo, **kwds
):
return cls(
variable=variable,
source_topo=source_topo,
target_topo=target_topo,
components=components,
**kwds,
)
return None